Re: Parallel copy

Поиск
Список
Период
Сортировка
От Rafia Sabih
Тема Re: Parallel copy
Дата
Msg-id CA+FpmFcJ8ZxGg7rZr74F9BM0XVG8LvFFyM0zgALZOmF-Ox8Y=g@mail.gmail.com
обсуждение исходный текст
Ответ на Re: Parallel copy  (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>)
Ответы Re: Parallel copy  (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>)
Список pgsql-hackers
On Sat, 11 Jul 2020 at 08:55, Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
>
> Thanks Vignesh for the review. Addressed the comments in 0006 patch.
>
> >
> > we can create a local variable and use in place of
> > cstate->pcdata->curr_data_block.
>
> Done.
>
> > +       if (cstate->raw_buf_index + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
> > +               AdjustFieldInfo(cstate, 1);
> > +
> > +       memcpy(&fld_count,
> > &cstate->pcdata->curr_data_block->data[cstate->raw_buf_index],
> > sizeof(fld_count));
> > Should this be like below, as the remaining size can fit in current block:
> >        if (cstate->raw_buf_index + sizeof(fld_count) >= DATA_BLOCK_SIZE)
> >
> > +       if ((cstate->raw_buf_index + sizeof(fld_size)) >= (DATA_BLOCK_SIZE - 1))
> > +       {
> > +               AdjustFieldInfo(cstate, 2);
> > +               *new_block_pos = pcshared_info->cur_block_pos;
> > +       }
> > Same like above.
>
> Yes you are right. Changed.
>
> >
> > +       movebytes = DATA_BLOCK_SIZE - cstate->raw_buf_index;
> > +
> > +       cstate->pcdata->curr_data_block->skip_bytes = movebytes;
> > +
> > +       data_block = &pcshared_info->data_blocks[block_pos];
> > +
> > +       if (movebytes > 0)
> > Instead of the above check, we can have an assert check for movebytes.
>
> No, we can't use assert here. For the edge case where the current data
> block is full to the size DATA_BLOCK_SIZE, then movebytes will be 0,
> but we need to get a new data block. We avoid memmove by having
> movebytes>0 check.
>
> > +       if (mode == 1)
> > +       {
> > +               cstate->pcdata->curr_data_block = data_block;
> > +               cstate->raw_buf_index = 0;
> > +       }
> > +       else if(mode == 2)
> > +       {
> > +               ParallelCopyDataBlock *prev_data_block = NULL;
> > +               prev_data_block = cstate->pcdata->curr_data_block;
> > +               prev_data_block->following_block = block_pos;
> > +               cstate->pcdata->curr_data_block = data_block;
> > +
> > +               if (prev_data_block->curr_blk_completed  == false)
> > +                       prev_data_block->curr_blk_completed = true;
> > +
> > +               cstate->raw_buf_index = 0;
> > +       }
> >
> > This code is common for both, keep in common flow and remove if (mode == 1)
> > cstate->pcdata->curr_data_block = data_block;
> > cstate->raw_buf_index = 0;
> >
>
> Done.
>
> > +#define CHECK_FIELD_COUNT \
> > +{\
> > +       if (fld_count == -1) \
> > +       { \
> > +               if (IsParallelCopy() && \
> > +                       !IsLeader()) \
> > +                       return true; \
> > +               else if (IsParallelCopy() && \
> > +                       IsLeader()) \
> > +               { \
> > +                       if
> > (cstate->pcdata->curr_data_block->data[cstate->raw_buf_index +
> > sizeof(fld_count)] != 0) \
> > +                               ereport(ERROR, \
> > +
> > (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), \
> > +                                               errmsg("received copy
> > data after EOF marker"))); \
> > +                       return true; \
> > +               } \
> > We only copy sizeof(fld_count), Shouldn't we check fld_count !=
> > cstate->max_fields? Am I missing something here?
>
> fld_count != cstate->max_fields check is done after the above checks.
>
> > +       if ((DATA_BLOCK_SIZE - cstate->raw_buf_index) >= fld_size)
> > +       {
> > +               cstate->raw_buf_index = cstate->raw_buf_index + fld_size;
> > +       }
> > We can keep the check like cstate->raw_buf_index + fld_size < ..., for
> > better readability and consistency.
> >
>
> I think this is okay. It gives a good meaning that available bytes in
> the current data block is greater or equal to fld_size then, the tuple
> lies in the current data block.
>
> > +static pg_attribute_always_inline void
> > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
> > +       Oid typioparam, int32 typmod, uint32 *new_block_pos,
> > +       int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
> > +       ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
> > flinfo, typioparam & typmod is not used, we can remove the parameter.
> >
>
> Done.
>
> > +static pg_attribute_always_inline void
> > +CopyReadBinaryAttributeLeader(CopyState cstate, FmgrInfo *flinfo,
> > +       Oid typioparam, int32 typmod, uint32 *new_block_pos,
> > +       int m, ParallelCopyTupleInfo *tuple_start_info_ptr,
> > +       ParallelCopyTupleInfo *tuple_end_info_ptr, uint32 *line_size)
> > I felt this function need not be an inline function.
>
> Yes. Changed.
>
> >
> > +               /* binary format */
> > +               /* for paralle copy leader, fill in the error
> > There are some typos, run spell check
>
> Done.
>
> >
> > +               /* raw_buf_index should never cross data block size,
> > +                * as the required number of data blocks would have
> > +                * been obtained in the above while loop.
> > +                */
> > There are few places, commenting style should be changed to postgres style
>
> Changed.
>
> >
> > +       if (cstate->pcdata->curr_data_block == NULL)
> > +       {
> > +               block_pos = WaitGetFreeCopyBlock(pcshared_info);
> > +
> > +               cstate->pcdata->curr_data_block =
> > &pcshared_info->data_blocks[block_pos];
> > +
> > +               cstate->raw_buf_index = 0;
> > +
> > +               readbytes = CopyGetData(cstate,
> > &cstate->pcdata->curr_data_block->data, 1, DATA_BLOCK_SIZE);
> > +
> > +               elog(DEBUG1, "LEADER - bytes read from file %d", readbytes);
> > +
> > +               if (cstate->reached_eof)
> > +                       return true;
> > +       }
> > There are many empty lines, these are not required.
> >
>
> Removed.
>
> >
> > +
> > +       fld_count = (int16) pg_ntoh16(fld_count);
> > +
> > +       CHECK_FIELD_COUNT;
> > +
> > +       cstate->raw_buf_index = cstate->raw_buf_index + sizeof(fld_count);
> > +       new_block_pos = pcshared_info->cur_block_pos;
> > You can run pg_indent once for the changes.
> >
>
> I ran pg_indent and observed that there are many places getting
> modified by pg_indent. If we need to run pg_indet on copy.c for
> parallel copy alone, then first, we need to run on plane copy.c and
> take those changes and then run for all parallel copy files. I think
> we better run pg_indent, for all the parallel copy patches once and
> for all, maybe just before we kind of finish up all the code reviews.
>
> > +       if (mode == 1)
> > +       {
> > +               cstate->pcdata->curr_data_block = data_block;
> > +               cstate->raw_buf_index = 0;
> > +       }
> > +       else if(mode == 2)
> > +       {
> > Could use macros for 1 & 2 for better readability.
>
> Done.
>
> >
> > +
> > +                               if (following_block_id == -1)
> > +                                       break;
> > +                       }
> > +
> > +                       if (following_block_id != -1)
> > +
> > pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
> > 1);
> > +
> > +                       *line_size = *line_size +
> > tuple_end_info_ptr->offset + 1;
> > +               }
> > We could calculate the size as we parse and identify one record, if we
> > do that way this can be removed.
> >
>
> Done.

Hi Bharath,

I was looking forward to review this patch-set but unfortunately it is
showing a reject in copy.c, and might need a rebase.
I was applying on master over the commit-
cd22d3cdb9bd9963c694c01a8c0232bbae3ddcfb.

-- 
Regards,
Rafia Sabih



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Julien Rouhaud
Дата:
Сообщение: Avoid useless retrieval of defaults and check constraints in pg_dump -a
Следующее
От: Michael Paquier
Дата:
Сообщение: Re: Towards easier AMs: Cleaning up inappropriate use of name "relkind"