Re: Perform streaming logical transactions by background workers and parallel apply
От | Peter Smith |
---|---|
Тема | Re: Perform streaming logical transactions by background workers and parallel apply |
Дата | |
Msg-id | CAHut+PuAxW57fowiMrn=3=53sagmehiTSW0o1Q52MpR3phUmyw@mail.gmail.com обсуждение исходный текст |
Ответ на | RE: Perform streaming logical transactions by background workers and parallel apply ("shiy.fnst@fujitsu.com" <shiy.fnst@fujitsu.com>) |
Ответы |
RE: Perform streaming logical transactions by background workers and parallel apply
("wangw.fnst@fujitsu.com" <wangw.fnst@fujitsu.com>)
|
Список | pgsql-hackers |
"Here are my review comments for v6-0001. ====== 1. General I saw that now in most places you are referring to the new kind of worker as the "apply background worker". But there are a few comments remaining that still refer to "bgworker". Please search the entire patch for "bgworker" in the comments and replace them with "apply background worker". ====== 2. Commit message We also need to allow stream_stop to complete by the apply background worker to finish it to avoid deadlocks because T-1's current stream of changes can update rows in conflicting order with T-2's next stream of changes. Something is not right with this wording: "to complete by the apply background worker to finish it...". Maybe just omit the words "to finish it" (??). ~~~ 3. Commit message This patch also extends the subscription streaming option so that... SUGGESTION This patch also extends the SUBSCRIPTION 'streaming' option so that... ====== 4. src/backend/commands/subscriptioncmds.c - defGetStreamingMode +/* + * Extract the streaming mode value from a DefElem. This is like + * defGetBoolean() but also accepts the special value and "apply". + */ +static char +defGetStreamingMode(DefElem *def) Typo: "special value and..." -> "special value of..." ====== 5. src/backend/replication/logical/launcher.c - logicalrep_worker_launch + + if (subworker_dsm == DSM_HANDLE_INVALID) + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyWorkerMain"); + else + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyBgworkerMain"); + + 5a. This condition should be using the new 'is_subworker' bool 5b. Double blank lines? ~~~ 6. src/backend/replication/logical/launcher.c - logicalrep_worker_launch - else + else if (subworker_dsm == DSM_HANDLE_INVALID) snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication worker for subscription %u", subid); + else + snprintf(bgw.bgw_name, BGW_MAXLEN, + "logical replication apply worker for subscription %u", subid); snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication worker"); This condition also should be using the new 'is_subworker' bool ~~~ 7. src/backend/replication/logical/launcher.c - logicalrep_worker_stop_internal + + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + I think there should be a comment here to say that this lock is required/expected to be released by the caller of this function. ====== 8. src/backend/replication/logical/origin.c - replorigin_session_setup @@ -1068,7 +1068,7 @@ ReplicationOriginExitCleanup(int code, Datum arg) * with replorigin_session_reset(). */ void -replorigin_session_setup(RepOriginId node) +replorigin_session_setup(RepOriginId node, bool acquire) { This function has been problematic for several reviews. I saw that you removed the previously confusing comment but I still feel some kind of explanation is needed for the vague 'acquire' parameter. OTOH perhaps if you just change the param name to 'must_acquire' then I think it would be self-explanatory. ====== 9. src/backend/replication/logical/worker.c - General Some of the logs have a prefix "[Apply BGW #%u]" and some do not; I did not really understand how you decided to prefix or not so I did not comment about them individually. Are they all OK? Perhaps if you can explain the reason for the choices I can review it better next time. ~~~ 10. src/backend/replication/logical/worker.c - General There are multiple places in the code where there is code checking if/else for bgworker or normal apply worker. And in those places, there is often a comment like: "If we are in main apply worker..." But it is redundant to say "If we are" because we know we are. Instead, those cases should say a comment at the top of the else like: /* This is the main apply worker. */ And then the "If we are in main apply worker" text can be removed from the comment. There are many examples in the patch like this. Please search and modify all of them. ~~~ 11. src/backend/replication/logical/worker.c - file header comment The whole comment is similar to the commit message so any changes made there (for #2, #3) should be made here also. ~~~ 12. src/backend/replication/logical/worker.c +typedef struct WorkerEntry +{ + TransactionId xid; + WorkerState *wstate; +} WorkerEntry; Missing comment for this structure ~~~ 13. src/backend/replication/logical/worker.c WorkerState WorkerEntry I felt that these struct names seem too generic - shouldn't they be something more like ApplyBgworkerState, ApplyBgworkerEntry ~~~ 14. src/backend/replication/logical/worker.c +static List *ApplyWorkersIdleList = NIL; IMO maybe ApplyWorkersFreeList is a better name than IdleList for this. "Idle" sounds just like it is paused rather than available for someone else to use. If you change this then please search the rest of the patch for mentions in log messages etc ~~~ 15. src/backend/replication/logical/worker.c +static WorkerState *stream_apply_worker = NULL; + +/* check if we apply transaction in apply bgworker */ +#define apply_bgworker_active() (in_streamed_transaction && stream_apply_worker != NULL) Wording: "if we apply transaction" -> "if we are applying the transaction" ~~~ 16. src/backend/replication/logical/worker.c - handle_streamed_transaction + * For the main apply worker, if in streaming mode (receiving a block of + * streamed transaction), we send the data to the apply background worker. + * + * For the apply background worker, define a savepoint if new subtransaction + * was started. * * Returns true for streamed transactions, false otherwise (regular mode). */ static bool handle_streamed_transaction(LogicalRepMsgType action, StringInfo s) 16a. Typo: "if new subtransaction" -> "if a new subtransaction" 16b. That "regular mode" comment seems not quite right because IIUC it also returns false also for a bgworker (which hardly seems like a "regular mode") ~~~ 17. src/backend/replication/logical/worker.c - handle_streamed_transaction - /* not in streaming mode */ - if (!in_streamed_transaction) + /* + * Return if we are not in streaming mode and are not in an apply + * background worker. + */ + if (!in_streamed_transaction && !am_apply_bgworker()) return false; Somehow I found this condition confusing, the comment is not helpful either because it just says exactly what the code says. Can you give a better explanatory comment? e.g. Maybe the comment should be: "Return if not in streaming mode (unless this is an apply background worker)" e.g. Maybe condition is easier to understand if written as: if (!(in_streamed_transaction || am_apply_bgworker())) ~~~ 18. src/backend/replication/logical/worker.c - handle_streamed_transaction + if (action == LOGICAL_REP_MSG_RELATION) + { + LogicalRepRelation *rel = logicalrep_read_rel(s); + logicalrep_relmap_update(rel); + } + + } + else + { + /* Add the new subxact to the array (unless already there). */ + subxact_info_add(current_xid); Unnecessary blank line. ~~~ 19. src/backend/replication/logical/worker.c - find_or_start_apply_bgworker + if (found) + { + entry->wstate->pstate->state = APPLY_BGWORKER_BUSY; + return entry->wstate; + } + else if (!start) + return NULL; + + /* If there is at least one worker in the idle list, then take one. */ + if (list_length(ApplyWorkersIdleList) > 0) I felt that there should be a comment (after the return NULL) that says: /* * Start a new apply background worker */ ~~~ 20. src/backend/replication/logical/worker.c - apply_bgworker_free +/* + * Add the worker to the freelist and remove the entry from hash table. + */ +static void +apply_bgworker_free(WorkerState *wstate) 20a. Typo: "freelist" -> "free list" 20b. Elsewhere (and in the log message) this is called the idle list (but actually I prefer "free list" like in this comment). See also comment #14. ~~~ 21. src/backend/replication/logical/worker.c - apply_bgworker_free + hash_search(ApplyWorkersHash, &xid, + HASH_REMOVE, &found); 21a. If you are not going to check the value of ‘found’ then why bother to pass this param at all; can’t you just pass NULL? (I think I asked the same question in a previous review) 21b. The wrapping over 2 lines seems unnecessary here. ~~~ 22. src/backend/replication/logical/worker.c - apply_handle_stream_start /* - * Initialize the worker's stream_fileset if we haven't yet. This will be - * used for the entire duration of the worker so create it in a permanent - * context. We create this on the very first streaming message from any - * transaction and then use it for this and other streaming transactions. - * Now, we could create a fileset at the start of the worker as well but - * then we won't be sure that it will ever be used. + * If we are in main apply worker, check if there is any free bgworker + * we can use to process this transaction. */ - if (MyLogicalRepWorker->stream_fileset == NULL) + stream_apply_worker = apply_bgworker_find_or_start(stream_xid, first_segment); 22a. Typo: "in main apply worker" -> "in the main apply worker" 22b. Since this is not if/else code, it might be better to put Assert(!am_apply_bgworker()); above this just to make it more clear. ~~~ 23. src/backend/replication/logical/worker.c - apply_handle_stream_start + /* + * If we have free worker or we already started to apply this + * transaction in bgworker, we pass the data to worker. + */ SUGGESTION If we have found a free worker or if we are already applying this transaction in an apply background worker, then we pass the data to that worker. ~~~ 24. src/backend/replication/logical/worker.c - apply_handle_stream_abort +apply_handle_stream_abort(StringInfo s) { - StringInfoData s2; - int nchanges; - char path[MAXPGPATH]; - char *buffer = NULL; - MemoryContext oldcxt; - BufFile *fd; + TransactionId xid; + TransactionId subxid; - maybe_start_skipping_changes(lsn); + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM COMMIT message without STREAM STOP"))); Typo? Shouldn't that errmsg say "STREAM ABORT message..." instead of "STREAM COMMIT message..." ~~~ 25. src/backend/replication/logical/worker.c - apply_handle_stream_abort + for(i = list_length(subxactlist) - 1; i >= 0; i--) + { Missing space after "for" ~~~ 26. src/backend/replication/logical/worker.c - apply_handle_stream_abort + if (found) + { + elog(LOG, "rolled back to savepoint %s", spname); + RollbackToSavepoint(spname); + CommitTransactionCommand(); + subxactlist = list_truncate(subxactlist, i + 1); + } Does this need to log anything if nothing was found? Or is it ok to leave as-is and silently ignore it? ~~~ 27. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop + if (len == 0) + { + elog(LOG, "[Apply BGW #%u] got zero-length message, stopping", pst->n); + break; + } Maybe it is unnecessary to say "stopping" because it will say that in the next log anyway when it breaks out of the main loop. ~~~ 28. src/backend/replication/logical/worker.c - LogicalApplyBgwLoop + default: + elog(ERROR, "unexpected message"); + break; Perhaps the switch byte should be in a variable so then you can log what was the unexpected byte code received. e.g. Similar to apply_handle_tuple_routing function. ~~~ 29. src/backend/replication/logical/worker.c - LogicalApplyBgwMain + /* + * The apply bgworker don't need to monopolize this replication origin + * which was already acquired by its leader process. + */ + replorigin_session_setup(originid, false); + replorigin_session_origin = originid; + CommitTransactionCommand(); Typo: The apply bgworker don't need ..." -> "The apply background workers don't need ..." or -> "The apply background worker doesn't need ..." ~~~ 30. src/backend/replication/logical/worker.c - apply_bgworker_setup +/* + * Start apply worker background worker process and allocate shared memory for + * it. + */ +static WorkerState * +apply_bgworker_setup(void) Typo: "apply worker background worker process" -> "apply background worker process" ~~~ 31. src/backend/replication/logical/worker.c - apply_bgworker_wait_for + /* If any workers (or the postmaster) have died, we have failed. */ + if (status == APPLY_BGWORKER_EXIT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Background worker %u failed to apply transaction %u", + wstate->pstate->n, wstate->pstate->stream_xid))); The errmsg should start with a lowercase letter. ~~~ 32. src/backend/replication/logical/worker.c - check_workers_status + /* + * We don't lock here as in the worst case we will just detect the + * failure of worker a bit later. + */ + if (wstate->pstate->state == APPLY_BGWORKER_EXIT) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Background worker %u exited unexpectedly", + wstate->pstate->n))); The errmsg should start with a lowercase letter. ~~~ 33. src/backend/replication/logical/worker.c - check_workers_status +/* Set the state of apply background worker */ +static void +apply_bgworker_set_state(char state) Maybe OK, or perhaps choose from one of: - "Set the state of an apply background worker" - "Set the apply background worker state" ====== 34. src/bin/pg_dump/pg_dump.c - getSubscriptions @@ -4450,7 +4450,7 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 140000) appendPQExpBufferStr(query, " s.substream,\n"); else - appendPQExpBufferStr(query, " false AS substream,\n"); + appendPQExpBufferStr(query, " 'f' AS substream,\n"); Is that logic right? Before this patch the attribute was bool; now it is char. So doesn't there need to be some conversion/mapping here for when you read from >= 140000 but it was still bool so you need to convert 'false' -> 'f' and 'true' -> 't'? ====== 35. src/include/replication/origin.h @@ -53,7 +53,7 @@ extern XLogRecPtr replorigin_get_progress(RepOriginId node, bool flush); extern void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit); -extern void replorigin_session_setup(RepOriginId node); +extern void replorigin_session_setup(RepOriginId node, bool acquire); As previously suggested in comment #8 maybe the 2nd parm should be 'must_acquire'. ====== 36. src/include/replication/worker_internal.h @@ -60,6 +60,8 @@ typedef struct LogicalRepWorker */ FileSet *stream_fileset; + bool subworker; + Probably this new member deserves a comment. ------ Kind Regards, Peter Smith. Fujitsu Australia
В списке pgsql-hackers по дате отправления: