Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions

Поиск
Список
Период
Сортировка
От Amit Kapila
Тема Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions
Дата
Msg-id CAA4eK1Jim-3BnTH_Vt=nvuzLGKacyuHKFmPdssoJ5Go0MnqRBg@mail.gmail.com
обсуждение исходный текст
Ответ на Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Dilip Kumar <dilipbalaut@gmail.com>)
Ответы Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Dilip Kumar <dilipbalaut@gmail.com>)
Re: PATCH: logical_work_mem and logical streaming of largein-progress transactions  (Dilip Kumar <dilipbalaut@gmail.com>)
Список pgsql-hackers
On Wed, May 13, 2020 at 11:35 AM Dilip Kumar <dilipbalaut@gmail.com> wrote:
>
> On Tue, May 12, 2020 at 4:39 PM Amit Kapila <amit.kapila16@gmail.com> wrote:
> >
> >
> > v20-0003-Extend-the-output-plugin-API-with-stream-methods
> > ----------------------------------------------------------------------------------------
> > 1.
> > +static void
> > +pg_decode_stream_change(LogicalDecodingContext *ctx,
> > + ReorderBufferTXN *txn,
> > + Relation relation,
> > + ReorderBufferChange *change)
> > +{
> > + OutputPluginPrepareWrite(ctx, true);
> > + appendStringInfo(ctx->out, "streaming change for TXN %u", txn->xid);
> > + OutputPluginWrite(ctx, true);
> > +}
> > +
> > +static void
> > +pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> > +   int nrelations, Relation relations[],
> > +   ReorderBufferChange *change)
> > +{
> > + OutputPluginPrepareWrite(ctx, true);
> > + appendStringInfo(ctx->out, "streaming truncate for TXN %u", txn->xid);
> > + OutputPluginWrite(ctx, true);
> > +}
> >
> > In the above and similar APIs, there are parameters like relation
> > which are not used.  I think you should add some comments atop these
> > APIs to explain why it is so? I guess it is because we want to keep
> > them similar to non-stream version of APIs and we can't display
> > relation or other information as the transaction is still in-progress.
>
> I think because the interfaces are designed that way because other
> decoding plugins might need it e.g. in pgoutput we need change and
> relation but not here.  We have other similar examples also e.g.
> pg_decode_message has the parameter txn but not used.  Do you think we
> still need to add comments?
>

In that case, we can leave but lets ensure that we are not exposing
any parameter which is not used and if there is any due to some
reason, we should document it. I will also look into this.

> > 4.
> > +static void
> > +stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
> > +{
> > + LogicalDecodingContext *ctx = cache->private_data;
> > + LogicalErrorCallbackState state;
> > + ErrorContextCallback errcallback;
> > +
> > + Assert(!ctx->fast_forward);
> > +
> > + /* We're only supposed to call this when streaming is supported. */
> > + Assert(ctx->streaming);
> > +
> > + /* Push callback + info on the error context stack */
> > + state.ctx = ctx;
> > + state.callback_name = "stream_start";
> > + /* state.report_location = apply_lsn; */
> >
> > Why can't we supply the report_location here?  I think here we need to
> > report txn->first_lsn if this is the very first stream and
> > txn->final_lsn if it is any consecutive one.
>
> I am not sure about this,  Because for the very first stream we will
> report the location of the first lsn of the stream and for the
> consecutive stream we will report the last lsn in the stream.
>

Yeah, that doesn't seem to be consistent.  How about if get it as an
additional parameter?  The caller can pass the lsn of the very first
change it is trying to decode in this stream.

> >
> > 11.
> > - * HeapTupleSatisfiesHistoricMVCC.
> > + * tqual.c's HeapTupleSatisfiesHistoricMVCC.
> > + *
> > + * We do build the hash table even if there are no CIDs. That's
> > + * because when streaming in-progress transactions we may run into
> > + * tuples with the CID before actually decoding them. Think e.g. about
> > + * INSERT followed by TRUNCATE, where the TRUNCATE may not be decoded
> > + * yet when applying the INSERT. So we build a hash table so that
> > + * ResolveCminCmaxDuringDecoding does not segfault in this case.
> > + *
> > + * XXX We might limit this behavior to streaming mode, and just bail
> > + * out when decoding transaction at commit time (at which point it's
> > + * guaranteed to see all CIDs).
> >   */
> >  static void
> >  ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
> > @@ -1350,9 +1498,6 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer
> > *rb, ReorderBufferTXN *txn)
> >   dlist_iter iter;
> >   HASHCTL hash_ctl;
> >
> > - if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
> > - return;
> > -
> >
> > I don't understand this change.  Why would "INSERT followed by
> > TRUNCATE" could lead to a tuple which can come for decode before its
> > CID?
>
> Actually, even if we haven't decoded the DDL operation but in the
> actual system table the tuple might have been deleted from the next
> operation.  e.g. while we are streaming the INSERT it is possible that
> the truncate has already deleted that tuple and set the max for the
> tuple.  So before streaming patch, we were only streaming the INSERT
> only on commit so by that time we had got all the operation which has
> done DDL and we would have already prepared tuple CID hash.
>

Okay, but I think for that case how good is that we always allow CID
hash table to be built even if there are no catalog changes in TXN
(see changes in ReorderBufferBuildTupleCidHash).  Can't we detect that
while resolving the cmin/cmax?

Few more comments for v20-0005-Implement-streaming-mode-in-ReorderBuffer:
----------------------------------------------------------------------------------------------------------------
1.
/*
- * Binary heap comparison function.
+ * Binary heap comparison function (regular non-streaming iterator).
  */
 static int
 ReorderBufferIterCompare(Datum a, Datum b, void *arg)

It seems to me the above comment change is not required as per the latest patch.

2.
 * For subtransactions, we only mark them as streamed when there are
+ * any changes in them.
+ *
+ * We do it this way because of aborts - we don't want to send aborts
+ * for XIDs the downstream is not aware of. And of course, it always
+ * knows about the toplevel xact (we send the XID in all messages),
+ * but we never stream XIDs of empty subxacts.
+ */
+ if ((!txn->toptxn) || (txn->nentries_mem != 0))
+ txn->txn_flags |= RBTXN_IS_STREAMED;

/when there are any changes in them/when there are changes in them.  I
think we don't need 'any' in the above sentence.

3.
And, during catalog scan we can check the status of the xid and
+ * if it is aborted we will report a specific error that we can ignore.  We
+ * might have already streamed some of the changes for the aborted
+ * (sub)transaction, but that is fine because when we decode the abort we will
+ * stream abort message to truncate the changes in the subscriber.
+ */
+static inline void
+SetupCheckXidLive(TransactionId xid)

In the above comment, I don't think it is right to say that we ignore
the error raised due to the aborted transaction.  We need to say that
we discard the already streamed changes on such an error.

4.
+static inline void
+SetupCheckXidLive(TransactionId xid)
+{
  /*
- * If this transaction has no snapshot, it didn't make any changes to the
- * database, so there's nothing to decode.  Note that
- * ReorderBufferCommitChild will have transferred any snapshots from
- * subtransactions if there were any.
+ * setup CheckXidAlive if it's not committed yet. We don't check if the xid
+ * aborted. That will happen during catalog access.  Also reset the
+ * sysbegin_called flag.
  */
- if (txn->base_snapshot == NULL)
+ if (!TransactionIdDidCommit(xid))
  {
- Assert(txn->ninvalidations == 0);
- ReorderBufferCleanupTXN(rb, txn);
- return;
+ CheckXidAlive = xid;
+ bsysscan = false;
  }

I think this function is inline as it needs to be called for each
change. If that is the case and otherwise also, isn't it better that
we check if passed xid is the same as CheckXidAlive before checking
TransactionIdDidCommit as TransactionIdDidCommit can be costly and
calling it for each change might not be a good idea?

5.
setup CheckXidAlive if it's not committed yet. We don't check if the xid
+ * aborted. That will happen during catalog access.  Also reset the
+ * sysbegin_called flag.

/if the xid aborted/if the xid is aborted.  missing comma after Also.

6.
ReorderBufferProcessTXN()
{
..
- /* build data to be able to lookup the CommandIds of catalog tuples */
+ /*
+ * build data to be able to lookup the CommandIds of catalog tuples
+ */
  ReorderBufferBuildTupleCidHash(rb, txn);
..
}

Is there a need to change the formatting of the comment?

7.
ReorderBufferProcessTXN()
{
..
  if (using_subtxn)
- BeginInternalSubTransaction("replay");
+ BeginInternalSubTransaction("stream");
  else
  StartTransactionCommand();
..
}

I am not sure changing unconditionally "replay" to "stream" is a good
idea.  How about something like BeginInternalSubTransaction(streaming
? "stream" : "replay");?

8.
@@ -1588,8 +1766,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
  * use as a normal record. It'll be cleaned up at the end
  * of INSERT processing.
  */
- if (specinsert == NULL)
- elog(ERROR, "invalid ordering of speculative insertion changes");

You have removed this check but all other handling of specinsert is
same as far as this patch is concerned.  Why so?

9.
@@ -1676,8 +1860,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
  * freed/reused while restoring spooled data from
  * disk.
  */
- Assert(change->data.tp.newtuple != NULL);
-
  dlist_delete(&change->node);

Why is this Assert removed?

10.
@@ -1753,7 +1935,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
  relations[nrelations++] = relation;
  }

- rb->apply_truncate(rb, txn, nrelations, relations, change);
+ if (streaming)
+ {
+ rb->stream_truncate(rb, txn, nrelations, relations, change);
+
+ /* Remember that we have sent some data. */
+ change->txn->any_data_sent = true;
+ }
+ else
+ rb->apply_truncate(rb, txn, nrelations, relations, change);

Can we encapsulate this in a separate function like
ReorderBufferApplyTruncate or something like that?  Basically, rather
than having streaming check in this function, lets do it in some other
internal function.  And we can likewise do it for all the streaming
checks in this function or at least whereever it is feasible.  That
will make this function look clean.

11.
+ * We currently can only decode a transaction's contents when its commit
+ * record is read because that's the only place where we know about cache
+ * invalidations. Thus, once a toplevel commit is read, we iterate over the top
+ * and subtransactions (using a k-way merge) and replay the changes in lsn
+ * order.
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
{
..

I think the above comment needs to be updated after this patch. This
API can now be used during the decode of both a in-progress and a
committed transaction.

-- 
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Kyotaro Horiguchi
Дата:
Сообщение: Re: Add "-Wimplicit-fallthrough" to default flags
Следующее
От: Julien Rouhaud
Дата:
Сообщение: Re: Add "-Wimplicit-fallthrough" to default flags