Re: Include sequence relation support in logical replication

Поиск
Список
Период
Сортировка
От Andres Freund
Тема Re: Include sequence relation support in logical replication
Дата
Msg-id 20200325004439.wa6s4eg54qm3qyfs@alap3.anarazel.de
обсуждение исходный текст
Ответ на Include sequence relation support in logical replication  (Cary Huang <cary.huang@highgo.ca>)
Список pgsql-hackers
On 2020-03-24 16:19:21 -0700, Cary Huang wrote:
> Hi
> 
> 
> 
> From the PG logical replication documentation, I see that there is a
> listed limitation that sequence relation is not replicated
> logically. After some examination, I see that retrieving the next
> value from a sequence using the nextval() call will emits a WAL update
> every 32 calls to nextval(). In fact, when it emits a WAL update, it
> will write a future value 32 increments from now, and maintain a
> internal cache for delivering sequence numbers. It is done this way to
> minimize the write operation to WAL record at a risk of losing some
> values during a crash. So if we were to replicate the sequence, the
> subscriber will receive a future value (32 calls to nextval()) from
> now, and it obviously does not reflect current status. Sequence
> changes caused by other sequence-related SQL functions like setval()
> or ALTER SEQUENCE xxx, will always emit a WAL update, so replicating
> changes caused by these should not be a problem. 
> 
> 
> 
> I have shared a patch that allows sequence relation to be supported in logical replication via the decoding plugin (
test_decodingfor example ); it does not support sequence relation in logical replication between a PG publisher and a
PGsubscriber via pgoutput plugin as it will require much more work. For the replication to make sense, the patch
actuallydisables the WAL update at every 32 nextval() calls, so every call to nextval() will emit a WAL update for
properreplication. This is done by setting SEQ_LOG_VALS to 0 in sequence.c
 
> 
> 
> 
> I think the question is that should we minimize WAL update frequency (every 32 calls) for getting next value in a
sequenceat a cost of losing values during crash or being able to replicate a sequence relation properly at a cost or
moreWAL updates?
 
> 
> 
> 
> 
> 
> Cary Huang
> 
> -------------
> 
> HighGo Software Inc. (Canada)
> 
> mailto:cary.huang@highgo.ca
> 
> http://www.highgo.ca

> diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
> index 93c948856e..7a7e572d6c 100644
> --- a/contrib/test_decoding/test_decoding.c
> +++ b/contrib/test_decoding/test_decoding.c
> @@ -466,6 +466,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
>                                      &change->data.tp.oldtuple->tuple,
>                                      true);
>              break;
> +        case REORDER_BUFFER_CHANGE_SEQUENCE:
> +                    appendStringInfoString(ctx->out, " SEQUENCE:");
> +                    if (change->data.sequence.newtuple == NULL)
> +                        appendStringInfoString(ctx->out, " (no-tuple-data)");
> +                    else
> +                        tuple_to_stringinfo(ctx->out, tupdesc,
> +                                            &change->data.sequence.newtuple->tuple,
> +                                            false);
> +                    break;
>          default:
>              Assert(false);
>      }
> diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
> index 6aab73bfd4..941015e4aa 100644
> --- a/src/backend/commands/sequence.c
> +++ b/src/backend/commands/sequence.c
> @@ -49,11 +49,10 @@
>  
>  
>  /*
> - * We don't want to log each fetching of a value from a sequence,
> - * so we pre-log a few fetches in advance. In the event of
> - * crash we can lose (skip over) as many values as we pre-logged.
> + * Sequence replication is now supported and we will now need to log each sequence
> + * update to WAL such that the standby can properly receive the sequence change
>   */
> -#define SEQ_LOG_VALS    32
> +#define SEQ_LOG_VALS    0
>  
>  /*
>   * The "special area" of a sequence's buffer page looks like this.
> diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
> index c2e5e3abf8..3dc14ead08 100644
> --- a/src/backend/replication/logical/decode.c
> +++ b/src/backend/replication/logical/decode.c
> @@ -42,6 +42,7 @@
>  #include "replication/reorderbuffer.h"
>  #include "replication/snapbuild.h"
>  #include "storage/standby.h"
> +#include "commands/sequence.h"
>  
>  typedef struct XLogRecordBuffer
>  {
> @@ -70,9 +71,11 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
>                           xl_xact_parsed_commit *parsed, TransactionId xid);
>  static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
>                          xl_xact_parsed_abort *parsed, TransactionId xid);
> +static void DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
>  
>  /* common function to decode tuples */
>  static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
> +static void DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple);
>  
>  /*
>   * Take every XLogReadRecord()ed record and perform the actions required to
> @@ -130,6 +133,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
>              DecodeLogicalMsgOp(ctx, &buf);
>              break;
>  
> +        case RM_SEQ_ID:
> +            DecodeSequence(ctx, &buf);
> +            break;
> +
>              /*
>               * Rmgrs irrelevant for logical decoding; they describe stuff not
>               * represented in logical decoding. Add new rmgrs in rmgrlist.h's
> @@ -145,7 +152,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
>          case RM_HASH_ID:
>          case RM_GIN_ID:
>          case RM_GIST_ID:
> -        case RM_SEQ_ID:
>          case RM_SPGIST_ID:
>          case RM_BRIN_ID:
>          case RM_COMMIT_TS_ID:
> @@ -1052,3 +1058,80 @@ DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
>      header->t_infomask2 = xlhdr.t_infomask2;
>      header->t_hoff = xlhdr.t_hoff;
>  }
> +
> +/*
> + * Decode Sequence Tuple
> + */
> +static void
> +DecodeSeqTuple(char *data, Size len, ReorderBufferTupleBuf *tuple)
> +{
> +    int            datalen = len - sizeof(xl_seq_rec) - SizeofHeapTupleHeader;
> +
> +    Assert(datalen >= 0);
> +
> +    tuple->tuple.t_len = datalen + SizeofHeapTupleHeader;;
> +
> +    ItemPointerSetInvalid(&tuple->tuple.t_self);
> +
> +    tuple->tuple.t_tableOid = InvalidOid;
> +
> +    memcpy(((char *) tuple->tuple.t_data),
> +           data + sizeof(xl_seq_rec),
> +           SizeofHeapTupleHeader);
> +
> +    memcpy(((char *) tuple->tuple.t_data) + SizeofHeapTupleHeader,
> +           data + sizeof(xl_seq_rec) + SizeofHeapTupleHeader,
> +           datalen);
> +}
> +
> +/*
> + * Handle sequence decode
> + */
> +static void
> +DecodeSequence(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
> +{
> +    ReorderBufferChange *change;
> +    RelFileNode target_node;
> +    XLogReaderState *r = buf->record;
> +    char       *tupledata = NULL;
> +    Size        tuplelen;
> +    Size        datalen = 0;
> +    uint8        info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
> +
> +    /* only decode changes flagged with XLOG_SEQ_LOG  */
> +    if (info != XLOG_SEQ_LOG)
> +        return;
> +
> +    /* only interested in our database */
> +    XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
> +    if (target_node.dbNode != ctx->slot->data.database)
> +        return;
> +
> +    /* output plugin doesn't look for this origin, no need to queue */
> +    if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
> +        return;
> +
> +    change = ReorderBufferGetChange(ctx->reorder);
> +    change->action = REORDER_BUFFER_CHANGE_SEQUENCE;
> +    change->origin_id = XLogRecGetOrigin(r);
> +
> +    memcpy(&change->data.sequence.relnode, &target_node, sizeof(RelFileNode));
> +
> +    tupledata = XLogRecGetData(r);
> +    datalen = XLogRecGetDataLen(r);
> +
> +    if(!datalen || !tupledata)
> +        return;
> +
> +    tuplelen = datalen - SizeOfHeapHeader - sizeof(xl_seq_rec);
> +
> +    change->data.sequence.newtuple =
> +        ReorderBufferGetTupleBuf(ctx->reorder, tuplelen);
> +
> +    DecodeSeqTuple(tupledata, datalen, change->data.sequence.newtuple);
> +
> +    ReorderBufferXidSetCatalogChanges(ctx->reorder, XLogRecGetXid(buf->record), buf->origptr);
> +
> +    ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
> +
> +}
> diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
> index 481277a1fd..24f2cdf51d 100644
> --- a/src/backend/replication/logical/reorderbuffer.c
> +++ b/src/backend/replication/logical/reorderbuffer.c
> @@ -474,6 +474,13 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
>          case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
>          case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
>              break;
> +        case REORDER_BUFFER_CHANGE_SEQUENCE:
> +            if (change->data.sequence.newtuple)
> +            {
> +                ReorderBufferReturnTupleBuf(rb, change->data.sequence.newtuple);
> +                change->data.sequence.newtuple = NULL;
> +            }
> +            break;
>      }
>  
>      pfree(change);
> @@ -1833,6 +1840,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
>                  case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
>                      elog(ERROR, "tuplecid value in changequeue");
>                      break;
> +                case REORDER_BUFFER_CHANGE_SEQUENCE:
> +                    Assert(snapshot_now);
> +
> +                    reloid = RelidByRelfilenode(change->data.sequence.relnode.spcNode,
> +                                                change->data.sequence.relnode.relNode);
> +
> +                    if (reloid == InvalidOid &&
> +                        change->data.sequence.newtuple == NULL)
> +                        goto change_done;
> +                    else if (reloid == InvalidOid)
> +                        elog(ERROR, "could not map filenode \"%s\" to relation OID",
> +                             relpathperm(change->data.tp.relnode,
> +                                         MAIN_FORKNUM));
> +
> +                    relation = RelationIdGetRelation(reloid);
> +
> +                    if (!RelationIsValid(relation))
> +                        elog(ERROR, "could not open relation with OID %u (for filenode \"%s\")",
> +                             reloid,
> +                             relpathperm(change->data.sequence.relnode,
> +                                         MAIN_FORKNUM));
> +
> +                    if (!RelationIsLogicallyLogged(relation))
> +                        goto change_done;
> +
> +                    /* user-triggered change */
> +                    if (!IsToastRelation(relation))
> +                    {
> +                        ReorderBufferToastReplace(rb, txn, relation, change);
> +                        rb->apply_change(rb, txn, relation, change);
> +                    }
> +                    break;
>              }
>          }
>  
> @@ -2516,15 +2555,23 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
>          case REORDER_BUFFER_CHANGE_UPDATE:
>          case REORDER_BUFFER_CHANGE_DELETE:
>          case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
> +        case REORDER_BUFFER_CHANGE_SEQUENCE:
>              {
>                  char       *data;
>                  ReorderBufferTupleBuf *oldtup,
>                             *newtup;
>                  Size        oldlen = 0;
>                  Size        newlen = 0;
> -
> -                oldtup = change->data.tp.oldtuple;
> -                newtup = change->data.tp.newtuple;
> +                if (change->action == REORDER_BUFFER_CHANGE_SEQUENCE)
> +                {
> +                    oldtup = NULL;
> +                    newtup = change->data.sequence.newtuple;
> +                }
> +                else
> +                {
> +                    oldtup = change->data.tp.oldtuple;
> +                    newtup = change->data.tp.newtuple;
> +                }
>  
>                  if (oldtup)
>                  {
> @@ -2707,14 +2754,23 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
>          case REORDER_BUFFER_CHANGE_UPDATE:
>          case REORDER_BUFFER_CHANGE_DELETE:
>          case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
> +        case REORDER_BUFFER_CHANGE_SEQUENCE:
>              {
>                  ReorderBufferTupleBuf *oldtup,
>                             *newtup;
>                  Size        oldlen = 0;
>                  Size        newlen = 0;
>  
> -                oldtup = change->data.tp.oldtuple;
> -                newtup = change->data.tp.newtuple;
> +                if (change->action == REORDER_BUFFER_CHANGE_SEQUENCE)
> +                {
> +                    oldtup = NULL;
> +                    newtup = change->data.sequence.newtuple;
> +                }
> +                else
> +                {
> +                    oldtup = change->data.tp.oldtuple;
> +                    newtup = change->data.tp.newtuple;
> +                }
>  
>                  if (oldtup)
>                  {
> @@ -3048,6 +3104,32 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
>          case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
>          case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
>              break;
> +        case REORDER_BUFFER_CHANGE_SEQUENCE:
> +            if (change->data.sequence.newtuple)
> +            {
> +                /* here, data might not be suitably aligned! */
> +                uint32        tuplelen;
> +
> +                memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
> +                       sizeof(uint32));
> +
> +                change->data.sequence.newtuple =
> +                    ReorderBufferGetTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
> +
> +                /* restore ->tuple */
> +                memcpy(&change->data.sequence.newtuple->tuple, data,
> +                       sizeof(HeapTupleData));
> +                data += sizeof(HeapTupleData);
> +
> +                /* reset t_data pointer into the new tuplebuf */
> +                change->data.sequence.newtuple->tuple.t_data =
> +                    ReorderBufferTupleBufData(change->data.tp.newtuple);
> +
> +                /* restore tuple data itself */
> +                memcpy(change->data.sequence.newtuple->tuple.t_data, data, tuplelen);
> +                data += tuplelen;
> +            }
> +            break;
>      }
>  
>      dlist_push_tail(&txn->changes, &change->node);
> diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
> index 626ecf4dc9..cf3fd45c5f 100644
> --- a/src/include/replication/reorderbuffer.h
> +++ b/src/include/replication/reorderbuffer.h
> @@ -62,7 +62,8 @@ enum ReorderBufferChangeType
>      REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
>      REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT,
>      REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
> -    REORDER_BUFFER_CHANGE_TRUNCATE
> +    REORDER_BUFFER_CHANGE_TRUNCATE,
> +    REORDER_BUFFER_CHANGE_SEQUENCE,
>  };
>  
>  /* forward declaration */
> @@ -149,6 +150,15 @@ typedef struct ReorderBufferChange
>              CommandId    cmax;
>              CommandId    combocid;
>          }            tuplecid;
> +        /*
> +         * Truncate data for REORDER_BUFFER_CHANGE_SEQUENCE representing one
> +         * set of relations to be truncated.
> +         */
> +        struct
> +        {
> +            RelFileNode relnode;
> +            ReorderBufferTupleBuf *newtuple;
> +        }            sequence;
>      }            data;
>  
>      /*

Greetings,

Andres Freund



В списке pgsql-hackers по дате отправления:

Предыдущее
От: Andy Fan
Дата:
Сообщение: Re: Index Skip Scan
Следующее
От: David Rowley
Дата:
Сообщение: Re: Run-time pruning for ModifyTable