Re: Parallel copy

Поиск
Список
Период
Сортировка
От vignesh C
Тема Re: Parallel copy
Дата
Msg-id CALDaNm0=JkZ9sAaiFMw--=UrNrPuhXX0rn5RDgeEsHmwOpwuRQ@mail.gmail.com
обсуждение исходный текст
Ответ на Re: Parallel copy  (Amit Kapila <amit.kapila16@gmail.com>)
Ответы Re: Parallel copy  (Amit Kapila <amit.kapila16@gmail.com>)
Список pgsql-hackers

Thanks for your comments Amit, i have worked on the comments, my thoughts on the same are mentioned below.

On Tue, Jul 21, 2020 at 3:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
>
> On Fri, Jul 17, 2020 at 2:09 PM vignesh C <vignesh21@gmail.com> wrote:
> >
> > >
> > > Please find the updated patch with the fixes included.
> > >
> >
> > Patch 0003-Allow-copy-from-command-to-process-data-from-file-ST.patch
> > had few indentation issues, I have fixed and attached the patch for
> > the same.
> >
>
> Ensure to use the version with each patch-series as that makes it
> easier for the reviewer to verify the changes done in the latest
> version of the patch.  One way is to use commands like "git
> format-patch -6 -v <version_of_patch_series>" or you can add the
> version number manually.
>

Taken care.

> Review comments:
> ===================
>
> 0001-Copy-code-readjustment-to-support-parallel-copy
> 1.
> @@ -807,8 +835,11 @@ CopyLoadRawBuf(CopyState cstate)
>   else
>   nbytes = 0; /* no data need be saved */
>
> + if (cstate->copy_dest == COPY_NEW_FE)
> + minread = RAW_BUF_SIZE - nbytes;
> +
>   inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
> -   1, RAW_BUF_SIZE - nbytes);
> +   minread, RAW_BUF_SIZE - nbytes);
>
> No comment to explain why this change is done?
>
> 0002-Framework-for-leader-worker-in-parallel-copy

Currently CopyGetData copies a lesser amount of data to buffer even though space is available in buffer because minread was passed as 1 to CopyGetData. Because of this there are frequent call to CopyGetData for fetching the data. In this case it will load only some data due to the below check:
while (maxread > 0 && bytesread < minread && !cstate->reached_eof)
After reading some data bytesread will be greater than minread which is passed as 1 and return with lesser amount of data, even though there is some space.
This change is required for parallel copy feature as each time we get a new DSM data block which is of 64K size and copy the data. If we copy less data into DSM data blocks we might end up consuming all the DSM data blocks.  I felt this issue can be fixed as part of HEAD. Have posted a separate thread [1] for this. I'm planning to remove that change once it gets committed. Can that go as a separate
patch or should we include it here?
[1] - https://www.postgresql.org/message-id/CALDaNm0v4CjmvSnftYnx_9pOS_dKRG%3DO3NnBgJsQmi0KipvLog%40mail.gmail.com

> 2.
> + * ParallelCopyLineBoundary is common data structure between leader & worker,
> + * Leader process will be populating data block, data block offset &
> the size of
> + * the record in DSM for the workers to copy the data into the relation.
> + * This is protected by the following sequence in the leader & worker. If they
> + * don't follow this order the worker might process wrong line_size and leader
> + * might populate the information which worker has not yet processed or in the
> + * process of processing.
> + * Leader should operate in the following order:
> + * 1) check if line_size is -1, if not wait, it means worker is still
> + * processing.
> + * 2) set line_state to LINE_LEADER_POPULATING.
> + * 3) update first_block, start_offset & cur_lineno in any order.
> + * 4) update line_size.
> + * 5) update line_state to LINE_LEADER_POPULATED.
> + * Worker should operate in the following order:
> + * 1) check line_state is LINE_LEADER_POPULATED, if not it means
> leader is still
> + * populating the data.
> + * 2) read line_size.
> + * 3) only one worker should choose one line for processing, this is handled by
> + *    using pg_atomic_compare_exchange_u32, worker will change the sate to
> + *    LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED.
> + * 4) read first_block, start_offset & cur_lineno in any order.
> + * 5) process line_size data.
> + * 6) update line_size to -1.
> + */
> +typedef struct ParallelCopyLineBoundary
>
> Are we doing all this state management to avoid using locks while
> processing lines?  If so, I think we can use either spinlock or LWLock
> to keep the main patch simple and then provide a later patch to make
> it lock-less.  This will allow us to first focus on the main design of
> the patch rather than trying to make this datastructure processing
> lock-less in the best possible way.
>

The steps will be more or less same if we use spinlock too. step 1, step 3 & step 4 will be common we have to use lock & unlock instead of step 2 & step 5. I feel we can retain the current implementation.

> 3.
> + /*
> + * Actual lines inserted by worker (some records will be filtered based on
> + * where condition).
> + */
> + pg_atomic_uint64 processed;
> + pg_atomic_uint64 total_worker_processed; /* total processed records
> by the workers */
>
> The difference between processed and total_worker_processed is not
> clear.  Can we expand the comments a bit?
>

Fixed

> 4.
> + * SerializeList - Insert a list into shared memory.
> + */
> +static void
> +SerializeList(ParallelContext *pcxt, int key, List *inputlist,
> +   Size est_list_size)
> +{
> + if (inputlist != NIL)
> + {
> + ParallelCopyKeyListInfo *sharedlistinfo = (ParallelCopyKeyListInfo
> *)shm_toc_allocate(pcxt->toc,
> + est_list_size);
> + CopyListSharedMemory(inputlist, est_list_size, sharedlistinfo);
> + shm_toc_insert(pcxt->toc, key, sharedlistinfo);
> + }
> +}
>
> Why do we need to write a special mechanism (CopyListSharedMemory) to
> serialize a list.  Why can't we use nodeToString?  It should be able
> to take care of List datatype, see outNode which is called from
> nodeToString.  Once you do that, I think you won't need even
> EstimateLineKeysList, strlen should work instead.
>
> Check, if you have any similar special handling for other types that
> can be dealt with nodeToString?
>

Fixed

> 5.
> + MemSet(shared_info_ptr, 0, est_shared_info);
> + shared_info_ptr->is_read_in_progress = true;
> + shared_info_ptr->cur_block_pos = -1;
> + shared_info_ptr->full_transaction_id = full_transaction_id;
> + shared_info_ptr->mycid = GetCurrentCommandId(true);
> + for (count = 0; count < RINGSIZE; count++)
> + {
> + ParallelCopyLineBoundary *lineInfo =
> &shared_info_ptr->line_boundaries.ring[count];
> + pg_atomic_init_u32(&(lineInfo->line_size), -1);
> + }
> +
>
> You can move this initialization in a separate function.
>

Fixed

> 6.
> In function BeginParallelCopy(), you need to keep a provision to
> collect wal_usage and buf_usage stats.  See _bt_begin_parallel for
> reference.  Those will be required for pg_stat_statements.
>

Fixed

> 7.
> DeserializeString() -- it is better to name this function as RestoreString.
> ParallelWorkerInitialization() -- it is better to name this function
> as InitializeParallelCopyInfo or something like that, the current name
> is quite confusing.
> ParallelCopyLeader() -- how about ParallelCopyFrom? ParallelCopyLeader
> doesn't sound good to me.  You can suggest something else if you don't
> like ParallelCopyFrom
>

Fixed

> 8.
>  /*
> - * PopulateGlobalsForCopyFrom - Populates the common variables
> required for copy
> - * from operation. This is a helper function for BeginCopy function.
> + * PopulateCatalogInformation - Populates the common variables
> required for copy
> + * from operation. This is a helper function for BeginCopy &
> + * ParallelWorkerInitialization function.
>   */
>  static void
>  PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc tupDesc,
> - List *attnamelist)
> +    List *attnamelist)
>
> The actual function name and the name in function header don't match.
> I also don't like this function name, how about
> PopulateCommonCstateInfo?  Similarly how about changing
> PopulateCatalogInformation to PopulateCstateCatalogInfo?
>

Fixed

> 9.
> +static const struct
> +{
> + char *fn_name;
> + copy_data_source_cb fn_addr;
> +} InternalParallelCopyFuncPtrs[] =
> +
> +{
> + {
> + "copy_read_data", copy_read_data
> + },
> +};
>
> The function copy_read_data is present in
> src/backend/replication/logical/tablesync.c and seems to be used
> during logical replication.  Why do we want to expose this function as
> part of this patch?
>

I was thinking we could include the framework to support parallelism for logical replication too and can be enhanced when it is needed. Now I have removed this as part of the new patch provided, that can be added whenever required.

> 0003-Allow-copy-from-command-to-process-data-from-file-ST
> 10.
> In the commit message, you have written "The leader does not
> participate in the insertion of data, leaders only responsibility will
> be to identify the lines as fast as possible for the workers to do the
> actual copy operation. The leader waits till all the lines populated
> are processed by the workers and exits."
>
> I think you should also mention that we have chosen this design based
> on the reason "that everything stalls if the leader doesn't accept
> further input data, as well as when there are no available splitted
> chunks so it doesn't seem like a good idea to have the leader do other
> work.  This is backed by the performance data where we have seen that
> with 1 worker there is just a 5-10% (or whatever percentage difference
> you have seen) performance difference)".

Fixed.
Please find the new patch attached with the fixes.
Thoughts?


On Tue, Jul 21, 2020 at 3:54 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
On Fri, Jul 17, 2020 at 2:09 PM vignesh C <vignesh21@gmail.com> wrote:
>
> >
> > Please find the updated patch with the fixes included.
> >
>
> Patch 0003-Allow-copy-from-command-to-process-data-from-file-ST.patch
> had few indentation issues, I have fixed and attached the patch for
> the same.
>

Ensure to use the version with each patch-series as that makes it
easier for the reviewer to verify the changes done in the latest
version of the patch.  One way is to use commands like "git
format-patch -6 -v <version_of_patch_series>" or you can add the
version number manually.

Review comments:
===================

0001-Copy-code-readjustment-to-support-parallel-copy
1.
@@ -807,8 +835,11 @@ CopyLoadRawBuf(CopyState cstate)
  else
  nbytes = 0; /* no data need be saved */

+ if (cstate->copy_dest == COPY_NEW_FE)
+ minread = RAW_BUF_SIZE - nbytes;
+
  inbytes = CopyGetData(cstate, cstate->raw_buf + nbytes,
-   1, RAW_BUF_SIZE - nbytes);
+   minread, RAW_BUF_SIZE - nbytes);

No comment to explain why this change is done?

0002-Framework-for-leader-worker-in-parallel-copy
2.
+ * ParallelCopyLineBoundary is common data structure between leader & worker,
+ * Leader process will be populating data block, data block offset &
the size of
+ * the record in DSM for the workers to copy the data into the relation.
+ * This is protected by the following sequence in the leader & worker. If they
+ * don't follow this order the worker might process wrong line_size and leader
+ * might populate the information which worker has not yet processed or in the
+ * process of processing.
+ * Leader should operate in the following order:
+ * 1) check if line_size is -1, if not wait, it means worker is still
+ * processing.
+ * 2) set line_state to LINE_LEADER_POPULATING.
+ * 3) update first_block, start_offset & cur_lineno in any order.
+ * 4) update line_size.
+ * 5) update line_state to LINE_LEADER_POPULATED.
+ * Worker should operate in the following order:
+ * 1) check line_state is LINE_LEADER_POPULATED, if not it means
leader is still
+ * populating the data.
+ * 2) read line_size.
+ * 3) only one worker should choose one line for processing, this is handled by
+ *    using pg_atomic_compare_exchange_u32, worker will change the sate to
+ *    LINE_WORKER_PROCESSING only if line_state is LINE_LEADER_POPULATED.
+ * 4) read first_block, start_offset & cur_lineno in any order.
+ * 5) process line_size data.
+ * 6) update line_size to -1.
+ */
+typedef struct ParallelCopyLineBoundary

Are we doing all this state management to avoid using locks while
processing lines?  If so, I think we can use either spinlock or LWLock
to keep the main patch simple and then provide a later patch to make
it lock-less.  This will allow us to first focus on the main design of
the patch rather than trying to make this datastructure processing
lock-less in the best possible way.

3.
+ /*
+ * Actual lines inserted by worker (some records will be filtered based on
+ * where condition).
+ */
+ pg_atomic_uint64 processed;
+ pg_atomic_uint64 total_worker_processed; /* total processed records
by the workers */

The difference between processed and total_worker_processed is not
clear.  Can we expand the comments a bit?

4.
+ * SerializeList - Insert a list into shared memory.
+ */
+static void
+SerializeList(ParallelContext *pcxt, int key, List *inputlist,
+   Size est_list_size)
+{
+ if (inputlist != NIL)
+ {
+ ParallelCopyKeyListInfo *sharedlistinfo = (ParallelCopyKeyListInfo
*)shm_toc_allocate(pcxt->toc,
+ est_list_size);
+ CopyListSharedMemory(inputlist, est_list_size, sharedlistinfo);
+ shm_toc_insert(pcxt->toc, key, sharedlistinfo);
+ }
+}

Why do we need to write a special mechanism (CopyListSharedMemory) to
serialize a list.  Why can't we use nodeToString?  It should be able
to take care of List datatype, see outNode which is called from
nodeToString.  Once you do that, I think you won't need even
EstimateLineKeysList, strlen should work instead.

Check, if you have any similar special handling for other types that
can be dealt with nodeToString?

5.
+ MemSet(shared_info_ptr, 0, est_shared_info);
+ shared_info_ptr->is_read_in_progress = true;
+ shared_info_ptr->cur_block_pos = -1;
+ shared_info_ptr->full_transaction_id = full_transaction_id;
+ shared_info_ptr->mycid = GetCurrentCommandId(true);
+ for (count = 0; count < RINGSIZE; count++)
+ {
+ ParallelCopyLineBoundary *lineInfo =
&shared_info_ptr->line_boundaries.ring[count];
+ pg_atomic_init_u32(&(lineInfo->line_size), -1);
+ }
+

You can move this initialization in a separate function.

6.
In function BeginParallelCopy(), you need to keep a provision to
collect wal_usage and buf_usage stats.  See _bt_begin_parallel for
reference.  Those will be required for pg_stat_statements.

7.
DeserializeString() -- it is better to name this function as RestoreString.
ParallelWorkerInitialization() -- it is better to name this function
as InitializeParallelCopyInfo or something like that, the current name
is quite confusing.
ParallelCopyLeader() -- how about ParallelCopyFrom? ParallelCopyLeader
doesn't sound good to me.  You can suggest something else if you don't
like ParallelCopyFrom

8.
 /*
- * PopulateGlobalsForCopyFrom - Populates the common variables
required for copy
- * from operation. This is a helper function for BeginCopy function.
+ * PopulateCatalogInformation - Populates the common variables
required for copy
+ * from operation. This is a helper function for BeginCopy &
+ * ParallelWorkerInitialization function.
  */
 static void
 PopulateGlobalsForCopyFrom(CopyState cstate, TupleDesc tupDesc,
- List *attnamelist)
+    List *attnamelist)

The actual function name and the name in function header don't match.
I also don't like this function name, how about
PopulateCommonCstateInfo?  Similarly how about changing
PopulateCatalogInformation to PopulateCstateCatalogInfo?

9.
+static const struct
+{
+ char *fn_name;
+ copy_data_source_cb fn_addr;
+} InternalParallelCopyFuncPtrs[] =
+
+{
+ {
+ "copy_read_data", copy_read_data
+ },
+};

The function copy_read_data is present in
src/backend/replication/logical/tablesync.c and seems to be used
during logical replication.  Why do we want to expose this function as
part of this patch?

0003-Allow-copy-from-command-to-process-data-from-file-ST
10.
In the commit message, you have written "The leader does not
participate in the insertion of data, leaders only responsibility will
be to identify the lines as fast as possible for the workers to do the
actual copy operation. The leader waits till all the lines populated
are processed by the workers and exits."

I think you should also mention that we have chosen this design based
on the reason "that everything stalls if the leader doesn't accept
further input data, as well as when there are no available splitted
chunks so it doesn't seem like a good idea to have the leader do other
work.  This is backed by the performance data where we have seen that
with 1 worker there is just a 5-10% (or whatever percentage difference
you have seen) performance difference)".

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com
Вложения

В списке pgsql-hackers по дате отправления:

Предыдущее
От: Julien Rouhaud
Дата:
Сообщение: Re: expose parallel leader in CSV and log_line_prefix
Следующее
От: vignesh C
Дата:
Сообщение: Re: Parallel copy