+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr lsn,
+ bool transactional, const char *prefix, Size sz,
+ const char *message)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
Similar to the UPDATE/DELETE/INSERT records decoded when streaming is being
used, we need to add transaction id for transactional messages. May be we add
that even in case of non-streaming case and use it to decide whether it's a
transactional message or not. That might save us a byte when we are adding a
transaction id.
My preference is to add in the xid when streaming is enabled. (1) It is a more consistent implementation with the other message types, and (2) it saves 3 bytes when streaming is disabled. I've attached an updated patch. It is not a strong preference, though, if you suggest a different approach.
+ /* encode and send message flags */
+ if (transactional)
+ flags |= MESSAGE_TRANSACTIONAL;
+
+ pq_sendint8(out, flags);
Is 8 bits enough considering future improvements? What if we need to use more
than 8 bit flags?
8 possible flags already sounds like a lot, here, so I suspect that a byte will be sufficient for the foreseeable future. If we needed to go beyond that, it'd be a protocol version bump. (Well, it might first warrant reflection as to why we had so many flags...)
@@ -1936,6 +1936,9 @@ apply_dispatch(StringInfo s)
apply_handle_origin(s);
return;
+ case LOGICAL_REP_MSG_MESSAGE:
Should we add the logical message to the WAL downstream so that it flows
further down to a cascaded logical replica. Should that be controlled
by an option?
Hmm, I can't think of a use case for this, but perhaps someone could. Do you, or does anyone, have something in mind? I think we provide a lot of value with logical messages in pgoutput without supporting consumption from a downstream replica, so perhaps this is better considered separately.
If we want this, I think we would add a "messages" option on the subscription. If present, the subscriber will receive messages and pass them to any downstream subscribers. I started working on this and it does expand the change's footprint. As is, a developer would consume messages by connecting to a pgoutput slot on the message's origin. (e.g. via Debezium or a custom client) The subscription and logical worker infrastructure don't know about messages, but they would need to in order to support consuming an origin's messages on a downstream logical replica. In any case, I'll keep working on it so we can see what it looks like.
Cheers,
Dave