Re: Parallel copy

Поиск
Список
Период
Сортировка
От vignesh C
Тема Re: Parallel copy
Дата
Msg-id CALDaNm2RAhNnT=dDRokH+1aJ-kGM4RX3EfADqGTnKRKAcj+SEw@mail.gmail.com
обсуждение исходный текст
Ответ на Re: Parallel copy  (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>)
Ответы Re: Parallel copy  (Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>)
Список pgsql-hackers
On Mon, Jun 15, 2020 at 4:39 PM Bharath Rupireddy
<bharath.rupireddyforpostgres@gmail.com> wrote:
>
> The above tests were run with the configuration attached config.txt, which is the same used for performance tests of
csv/textfiles posted earlier in this mail chain.
 
>
> Request the community to take this patch up for review along with the parallel copy for csv/text file patches and
providefeedback.
 
>

I had reviewed the patch, few comments:
+
+       /*
+        * Parallel copy for binary formatted files
+        */
+       ParallelCopyDataBlock *curr_data_block;
+       ParallelCopyDataBlock *prev_data_block;
+       uint32                             curr_data_offset;
+       uint32                             curr_block_pos;
+       ParallelCopyTupleInfo  curr_tuple_start_info;
+       ParallelCopyTupleInfo  curr_tuple_end_info;
 } CopyStateData;

 The new members added should be present in ParallelCopyData

+       if (cstate->curr_tuple_start_info.block_id ==
cstate->curr_tuple_end_info.block_id)
+       {
+               elog(DEBUG1,"LEADER - tuple lies in a single data block");
+
+               line_size = cstate->curr_tuple_end_info.offset -
cstate->curr_tuple_start_info.offset + 1;
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_line_parts,
1);
+       }
+       else
+       {
+               uint32 following_block_id =
pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].following_block;
+
+               elog(DEBUG1,"LEADER - tuple is spread across data blocks");
+
+               line_size = DATA_BLOCK_SIZE -
cstate->curr_tuple_start_info.offset -
+
pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[cstate->curr_tuple_start_info.block_id].unprocessed_line_parts,
1);
+
+               while (following_block_id !=
cstate->curr_tuple_end_info.block_id)
+               {
+                       line_size = line_size + DATA_BLOCK_SIZE -
pcshared_info->data_blocks[following_block_id].skip_bytes;
+
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+                       following_block_id =
pcshared_info->data_blocks[following_block_id].following_block;
+
+                       if (following_block_id == -1)
+                               break;
+               }
+
+               if (following_block_id != -1)
+
pg_atomic_add_fetch_u32(&pcshared_info->data_blocks[following_block_id].unprocessed_line_parts,
1);
+
+               line_size = line_size + cstate->curr_tuple_end_info.offset + 1;
+       }

line_size can be set as and when we process the tuple from
CopyReadBinaryTupleLeader and this can be set at the end. That way the
above code can be removed.

+
+       /*
+        * Parallel copy for binary formatted files
+        */
+       ParallelCopyDataBlock *curr_data_block;
+       ParallelCopyDataBlock *prev_data_block;
+       uint32                             curr_data_offset;
+       uint32                             curr_block_pos;
+       ParallelCopyTupleInfo  curr_tuple_start_info;
+       ParallelCopyTupleInfo  curr_tuple_end_info;
 } CopyStateData;

curr_block_pos variable is present in ParallelCopyShmInfo, we could
use it and remove from here.
curr_data_offset, similar variable raw_buf_index is present in
CopyStateData, we could use it and remove from here.

+ if (cstate->curr_data_offset + sizeof(fld_count) >= (DATA_BLOCK_SIZE - 1))
+ {
+ ParallelCopyDataBlock *data_block = NULL;
+ uint8 movebytes = 0;
+
+ block_pos = WaitGetFreeCopyBlock(pcshared_info);
+
+ movebytes = DATA_BLOCK_SIZE - cstate->curr_data_offset;
+
+ cstate->curr_data_block->skip_bytes = movebytes;
+
+ data_block = &pcshared_info->data_blocks[block_pos];
+
+ if (movebytes > 0)
+ memmove(&data_block->data[0],
&cstate->curr_data_block->data[cstate->curr_data_offset],
+ movebytes);
+
+ elog(DEBUG1, "LEADER - field count is spread across data blocks -
moved %d bytes from current block %u to %u block",
+ movebytes, cstate->curr_block_pos, block_pos);
+
+ readbytes = CopyGetData(cstate, &data_block->data[movebytes], 1,
(DATA_BLOCK_SIZE - movebytes));
+
+ elog(DEBUG1, "LEADER - bytes read from file after field count is
moved to next data block %d", readbytes);
+
+ if (cstate->reached_eof)
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("unexpected EOF in COPY data")));
+
+ cstate->curr_data_block = data_block;
+ cstate->curr_data_offset = 0;
+ cstate->curr_block_pos = block_pos;
+ }

This code is duplicate in CopyReadBinaryTupleLeader &
CopyReadBinaryAttributeLeader. We could make a function and re-use.

+/*
+ * CopyReadBinaryAttributeWorker - leader identifies boundaries/offsets
+ * for each attribute/column, it moves on to next data block if the
+ * attribute/column is spread across data blocks.
+ */
+static pg_attribute_always_inline Datum
+CopyReadBinaryAttributeWorker(CopyState cstate, int column_no,
+               FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull)
+{
+       int32           fld_size;
+       Datum           result;

column_no is not used, it can be removed

+       if (fld_count == -1)
+       {
+               /*
+                       * Received EOF marker.  In a V3-protocol copy,
wait for the
+                       * protocol-level EOF, and complain if it doesn't come
+                       * immediately.  This ensures that we correctly
handle CopyFail,
+                       * if client chooses to send that now.
+                       *
+                       * Note that we MUST NOT try to read more data
in an old-protocol
+                       * copy, since there is no protocol-level EOF
marker then.  We
+                       * could go either way for copy from file, but
choose to throw
+                       * error if there's data after the EOF marker,
for consistency
+                       * with the new-protocol case.
+                       */
+               char            dummy;
+
+               if (cstate->copy_dest != COPY_OLD_FE &&
+                       CopyGetData(cstate, &dummy, 1, 1) > 0)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                               errmsg("received copy
data after EOF marker")));
+               return true;
+       }
+
+       if (fld_count != attr_count)
+               ereport(ERROR,
+                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                       errmsg("row field count is %d, expected %d",
+                       (int) fld_count, attr_count)));
+
+       cstate->curr_tuple_start_info.block_id = cstate->curr_block_pos;
+       cstate->curr_tuple_start_info.offset =  cstate->curr_data_offset;
+       cstate->curr_data_offset = cstate->curr_data_offset + sizeof(fld_count);
+       new_block_pos = cstate->curr_block_pos;
+
+       foreach(cur, cstate->attnumlist)
+       {
+               int                     attnum = lfirst_int(cur);
+               int                     m = attnum - 1;
+               Form_pg_attribute att = TupleDescAttr(tupDesc, m);

The above code is present in NextCopyFrom & CopyReadBinaryTupleLeader,
check if we can make a common function or we could use NextCopyFrom as
it is.

+       memcpy(&fld_count,
&cstate->curr_data_block->data[cstate->curr_data_offset],
sizeof(fld_count));
+       fld_count = (int16) pg_ntoh16(fld_count);
+
+       if (fld_count == -1)
+       {
+               return true;
+       }

Should this be an assert in CopyReadBinaryTupleWorker function as this
check is already done in the leader.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Amit Kapila
Дата:
Сообщение: Re: min_safe_lsn column in pg_replication_slots view
Следующее
От: Bruce Momjian
Дата:
Сообщение: Re: Transactions involving multiple postgres foreign servers, take 2