RE: Perform streaming logical transactions by background workers and parallel apply

Поиск
Список
Период
Сортировка
От wangw.fnst@fujitsu.com
Тема RE: Perform streaming logical transactions by background workers and parallel apply
Дата
Msg-id OS3PR01MB6275797F66EF0A47EEB2D8FC9EDD9@OS3PR01MB6275.jpnprd01.prod.outlook.com
обсуждение исходный текст
Ответ на Re: Perform streaming logical transactions by background workers and parallel apply  (Peter Smith <smithpb2250@gmail.com>)
Ответы Re: Perform streaming logical transactions by background workers and parallel apply  (Amit Kapila <amit.kapila16@gmail.com>)
Список pgsql-hackers
On Wed, May 18, 2022 3:11 PM Peter Smith <smithpb2250@gmail.com> wrote:
> "Here are my review comments for v6-0001.
Thanks for your comments.

> 7. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal
> 
> +
> + Assert(LWLockHeldByMe(LogicalRepWorkerLock));
> +
> 
> I think there should be a comment here to say that this lock is
> required/expected to be released by the caller of this function.
IMHO, it maybe not a problem to read code here.
In addition, keep consistent with other places where invoke this function in
the same file. So I did not change this.

> 9. src/backend/replication/logical/worker.c - General
> 
> Some of the logs have a prefix "[Apply BGW #%u]" and some do not; I
> did not really understand how you decided to prefix or not so I did
> not comment about them individually. Are they all OK? Perhaps if you
> can explain the reason for the choices I can review it better next
> time.
I think most of these logs should be logged in debug mode. So I changed them to
"DEBUG1" level.
And I added the prefix to all messages logged by apply background worker and
deleted some logs that I think maybe not very helpful. 

> 11. src/backend/replication/logical/worker.c - file header comment
> 
> The whole comment is similar to the commit message so any changes made
> there (for #2, #3) should be made here also.
Improve the comments as suggested in #2.
Sorry but I did not find same message as #2 here.

> 13. src/backend/replication/logical/worker.c
> 
> WorkerState
> WorkerEntry
> 
> I felt that these struct names seem too generic - shouldn't they be
> something more like ApplyBgworkerState, ApplyBgworkerEntry
> 
> ~~~
I think we have used "ApplyBgworkerState" in the patch. So I improved this with
the following modifications:
```
ApplyBgworkerState -> ApplyBgworkerStatus
WorkerState -> ApplyBgworkerState
WorkerEntry -> ApplyBgworkerEntry
```
BTW, I also modified the relevant comments and variable names.

> 16. src/backend/replication/logical/worker.c - handle_streamed_transaction
> 
> + * For the main apply worker, if in streaming mode (receiving a block of
> + * streamed transaction), we send the data to the apply background worker.
> + *
> + * For the apply background worker, define a savepoint if new subtransaction
> + * was started.
>   *
>   * Returns true for streamed transactions, false otherwise (regular mode).
>   */
>  static bool
>  handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
> 
> 16a.
> Typo: "if new subtransaction" -> "if a new subtransaction"
> 
> 16b.
> That "regular mode" comment seems not quite right because IIUC it also
> returns false also for a bgworker (which hardly seems like a "regular
> mode")
16a. Improved it as suggested.
16b. Changed the comment as follows:
From:
```
* Returns true for streamed transactions, false otherwise (regular mode).
```
To:
```
 * For non-streamed transactions, returns false;
 * For streamed transactions, returns true if in main apply worker, false
 * otherwise.
```

> 19. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker
> 
> + if (found)
> + {
> + entry->wstate->pstate->state = APPLY_BGWORKER_BUSY;
> + return entry->wstate;
> + }
> + else if (!start)
> + return NULL;
> +
> + /* If there is at least one worker in the idle list, then take one. */
> + if (list_length(ApplyWorkersIdleList) > 0)
> 
> I felt that there should be a comment (after the return NULL) that says:
> 
> /*
>  * Start a new apply background worker
>  */
> 
> ~~~
Improve this comment here.
After the code that you mentioned, it will try to get a apply background
worker (try to start one or take one from idle list). So I change the comment
as follows:
From:
```
/* If there is at least one worker in the idle list, then take one. */
```
To:
```
/*
 * Now, we try to get a apply background worker.
 * If there is at least one worker in the idle list, then take one.
 * Otherwise, we try to start a new apply background worker.
 */
```

> 22. src/backend/replication/logical/worker.c - apply_handle_stream_start
> 
>   /*
> - * Initialize the worker's stream_fileset if we haven't yet. This will be
> - * used for the entire duration of the worker so create it in a permanent
> - * context. We create this on the very first streaming message from any
> - * transaction and then use it for this and other streaming transactions.
> - * Now, we could create a fileset at the start of the worker as well but
> - * then we won't be sure that it will ever be used.
> + * If we are in main apply worker, check if there is any free bgworker
> + * we can use to process this transaction.
>   */
> - if (MyLogicalRepWorker->stream_fileset == NULL)
> + stream_apply_worker = apply_bgworker_find_or_start(stream_xid,
> first_segment);
> 
> 22a.
> Typo: "in main apply worker" -> "in the main apply worker"
> 
> 22b.
> Since this is not if/else code, it might be better to put
> Assert(!am_apply_bgworker()); above this just to make it more clear.
22a. Improved it as suggested.
22b. 
IMHO, since we have `if (am_apply_bgworker())` above and it will return in this
if-condition, so I just think Assert() might be a bit redundant here.
So I did not change this.
 
> 26. src/backend/replication/logical/worker.c - apply_handle_stream_abort
> 
> + if (found)
> + {
> + elog(LOG, "rolled back to savepoint %s", spname);
> + RollbackToSavepoint(spname);
> + CommitTransactionCommand();
> + subxactlist = list_truncate(subxactlist, i + 1);
> + }
> 
> Does this need to log anything if nothing was found? Or is it ok to
> leave as-is and silently ignore it?
Yes, I think it is okay.

> 33. src/backend/replication/logical/worker.c - check_workers_status
> 
> +/* Set the state of apply background worker */
> +static void
> +apply_bgworker_set_state(char state)
> 
> Maybe OK, or perhaps choose from one of:
> - "Set the state of an apply background worker"
> - "Set the apply background worker state"
Improve it by using the second one.

> 34. src/bin/pg_dump/pg_dump.c - getSubscriptions
> 
> @@ -4450,7 +4450,7 @@ getSubscriptions(Archive *fout)
>   if (fout->remoteVersion >= 140000)
>   appendPQExpBufferStr(query, " s.substream,\n");
>   else
> - appendPQExpBufferStr(query, " false AS substream,\n");
> + appendPQExpBufferStr(query, " 'f' AS substream,\n");
> 
> 
> Is that logic right? Before this patch the attribute was bool; now it
> is char. So doesn't there need to be some conversion/mapping here for
> when you read from >= 140000 but it was still bool so you need to
> convert 'false' -> 'f' and 'true' -> 't'?
Yes, I think it is right.
We could handle the input of option "streaming" : on/true/off/false/apply.

The rest of the comments are improved as suggested.


And thanks for Shi Yu to improve the patch 0002 by addressing the comments in
[1].

Attach the new patches(only changed 0001 and 0002)

[1] - https://www.postgresql.org/message-id/CAHut%2BPv_0nfUxriwxBQnZTOF5dy5nfG5NtWMr8e00mPrt2Vjzw%40mail.gmail.com

Regards,
Wang wei

Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Yugo NAGATA
Дата:
Сообщение: Re: Prevent writes on large objects in read-only transactions
Следующее
От: Andrey Borodin
Дата:
Сообщение: Amcheck verification of GiST and GIN