RE: Perform streaming logical transactions by background workers and parallel apply
| От | houzj.fnst@fujitsu.com |
|---|---|
| Тема | RE: Perform streaming logical transactions by background workers and parallel apply |
| Дата | |
| Msg-id | OS0PR01MB57169DAA9A2A6E68EE5E05F094E19@OS0PR01MB5716.jpnprd01.prod.outlook.com обсуждение исходный текст |
| Ответ на | Re: Perform streaming logical transactions by background workers and parallel apply (Amit Kapila <amit.kapila16@gmail.com>) |
| Ответы |
Re: Perform streaming logical transactions by background workers and parallel apply
Re: Perform streaming logical transactions by background workers and parallel apply |
| Список | pgsql-hackers |
On Wednesday, December 14, 2022 2:49 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Wed, Dec 14, 2022 at 9:50 AM houzj.fnst@fujitsu.com
> <houzj.fnst@fujitsu.com> wrote:
> >
> > On Tuesday, December 13, 2022 11:25 PM Masahiko Sawada
> <sawada.mshk@gmail.com> wrote:
> > >
> > > Here are comments on v59 0001, 0002 patches:
> >
> > Thanks for the comments!
> >
> > > +void
> > > +pa_increment_stream_block(ParallelApplyWorkerShared *wshared) {
> > > + while (1)
> > > + {
> > > + SpinLockAcquire(&wshared->mutex);
> > > +
> > > + /*
> > > + * Don't try to increment the count if the parallel
> > > apply worker is
> > > + * taking the stream lock. Otherwise, there would
> > > + be
> > > a race condition
> > > + * that the parallel apply worker checks there is
> > > + no
> > > pending streaming
> > > + * block and before it actually starts waiting on a
> > > lock, the leader
> > > + * sends another streaming block and take the
> > > + stream
> > > lock again. In
> > > + * this case, the parallel apply worker will start
> > > waiting for the next
> > > + * streaming block whereas there is actually a
> > > pending streaming block
> > > + * available.
> > > + */
> > > + if (!wshared->pa_wait_for_stream)
> > > + {
> > > + wshared->pending_stream_count++;
> > > + SpinLockRelease(&wshared->mutex);
> > > + break;
> > > + }
> > > +
> > > + SpinLockRelease(&wshared->mutex);
> > > + }
> > > +}
> > >
> > > I think we should add an assertion to check if we don't hold the stream lock.
> > >
> > > I think that waiting for pa_wait_for_stream to be false in a busy
> > > loop is not a good idea. It's not interruptible and there is not
> > > guarantee that we can break from this loop in a short time. For
> > > instance, if PA executes
> > > pa_decr_and_wait_stream_block() a bit earlier than LA executes
> > > pa_increment_stream_block(), LA has to wait for PA to acquire and
> > > release the stream lock in a busy loop. It should not be long in
> > > normal cases but the duration LA needs to wait for PA depends on PA,
> > > which could be long. Also what if PA raises an error in
> > > pa_lock_stream() due to some reasons? I think LA won't be able to
> > > detect the failure.
> > >
> > > I think we should at least make it interruptible and maybe need to
> > > add some sleep. Or perhaps we can use the condition variable for this case.
> >
>
> Or we can leave this while (true) logic altogether for the first version and have a
> comment to explain this race. Anyway, after restarting, it will probably be
> solved. We can always change this part of the code later if this really turns out
> to be problematic.
Agreed, and reverted this part.
>
> > Thanks for the analysis, I will research this part.
> >
> > > ---
> > > In worker.c, we have the following common pattern:
> > >
> > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > > write change to the file;
> > > do some work;
> > > break;
> > >
> > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > > pa_send_data();
> > >
> > > if (winfo->serialize_changes)
> > > {
> > > do some worker required after writing changes to the file.
> > > }
> > > :
> > > break;
> > >
> > > IIUC there are two different paths for partial serialization: (a)
> > > where apply_action is TRANS_LEADER_PARTIAL_SERIALIZE, and (b) where
> > > apply_action is TRANS_LEADER_PARTIAL_SERIALIZE and
> > > winfo->serialize_changes became true. And we need to match what we
> > > winfo->do
> > > in (a) and (b). Rather than having two different paths for the same
> > > case, how about falling through TRANS_LEADER_PARTIAL_SERIALIZE when
> > > we could not send the changes? That is, pa_send_data() just returns
> > > false when the timeout exceeds and we need to switch to serialize
> > > changes, otherwise returns true. If it returns false, we prepare for
> > > switching to serialize changes such as initializing fileset, and
> > > fall through TRANS_LEADER_PARTIAL_SERIALIZE case. The code would be
> like:
> > >
> > > case TRANS_LEADER_SEND_TO_PARALLEL:
> > > ret = pa_send_data();
> > >
> > > if (ret)
> > > {
> > > do work for sending changes to PA.
> > > break;
> > > }
> > >
> > > /* prepare for switching to serialize changes */
> > > winfo->serialize_changes = true;
> > > initialize fileset;
> > > acquire stream lock if necessary;
> > >
> > > /* FALLTHROUGH */
> > > case TRANS_LEADER_PARTIAL_SERIALIZE:
> > > do work for serializing changes;
> > > break;
> >
> > I think that the suggestion is to extract the code that switch to
> > serialize mode out of the pa_send_data(), and then we need to add that
> > logic in all the functions which call pa_send_data(), I am not sure if
> > it looks better as it might introduce some more codes in each handling
> function.
> >
>
> How about extracting the common code from apply_handle_stream_commit
> and apply_handle_stream_prepare to a separate function say
> pa_xact_finish_common()? I see there is a lot of common code (unlock the
> stream, wait for the finish, store flush location, free worker
> info) in both the functions for TRANS_LEADER_PARTIAL_SERIALIZE and
> TRANS_LEADER_SEND_TO_PARALLEL cases.
Agreed, changed. I also addressed Sawada-san comment by extracting the
code that switch to serialize out of pa_send_data().
> >
> > > ---
> > > void
> > > pa_lock_stream(TransactionId xid, LOCKMODE lockmode) {
> > > LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
> > > PARALLEL_APPLY_LOCK_STREAM,
> > > lockmode); }
> > >
> > > I think since we don't need to let the caller to specify the lock
> > > mode but need only shared and exclusive modes, we can make it simple
> > > by having a boolean argument say shared instead of lockmode.
> >
> > I personally think passing the lockmode would make the code more clear
> > than passing a Boolean value.
> >
>
> +1.
>
> I have made a few changes in the newly added comments and function name in
> the attached patch. Kindly include this if you find the changes okay.
Thanks, I have checked and merged it.
Attach the new version patch set which addressed all comments so far.
Best regards,
Hou zj
Вложения
- v61-0007-Add-a-main_worker_pid-to-pg_stat_subscription.patch
- v61-0001-Perform-streaming-logical-transactions-by-parall.patch
- v61-0002-Serialize-partial-changes-to-a-file-when-the-att.patch
- v61-0003-Test-streaming-parallel-option-in-tap-test.patch
- v61-0004-Allow-streaming-every-change-without-waiting-til.patch
- v61-0005-Add-GUC-stream_serialize_threshold-and-test-seri.patch
- v61-0006-Retry-to-apply-streaming-xact-only-in-apply-work.patch
В списке pgsql-hackers по дате отправления: